package main

import (
	"bytes"
	"fmt"
	"github.com/gomodule/redigo/redis"
	"google.golang.org/protobuf/proto"
	"net/http"
	"src/config"
	connection2 "src/connection"
	"src/global"
	"src/model/ProtoModel"
	"src/service"
	"src/utils"
	"strconv"
	"sync"
	"time"
	"unsafe"
)

var once sync.Once

// RedisClient Redis 服务
type RedisClient struct {
	Client *redis.Pool
}
type SubscribeCallback func(channel string, message []byte)

type Subscriber struct {
	pub   redis.PubSubConn
	cbMap map[string]SubscribeCallback
}
type redisCon struct {
	Conn redis.Conn
}

//var r *redisCon

// var client *redisCon
//var c Subscriber

func init() {
	defer func() {

	}()

	//client = subscribe.NewClient(&subscribe.Options{
	//	Addr:     "localhost:6379",
	//	Password: "123456", // 如果没有设置密码，则为空字符串
	//	DB:       0,        // 使用默认数据库
	//})
}

//var MyGlobal = &global.MyGlobal{}

// 向所有链接广播消息
func sendAllCon(w http.ResponseWriter, r *http.Request) {
	//fmt.Println(r)
	//global.Update()
	fmt.Println(global.Ws_.Get())
}
func (c Subscriber) sub(channel string, cb SubscribeCallback) {

	pool := connection2.GetDbRedisPool()
	client := pool.Get()
	if client == nil {
		return
	}
	//c.pub = subscribe.PubSubConn{conn}
	subc := redis.PubSubConn{client}
	fmt.Println(subc)
	//if subc == nil {
	//	return
	//}
	// 订阅channel1这个channel
	err := subc.Subscribe(channel)
	if err != nil {
		utils.Log.Error("subscribe Subscribe error.")
		return
	}
	//c.cbMap[channel] = cb
	//go func() {
	for {
		utils.Log.Info("wait...")
		switch res := subc.Receive().(type) {
		case redis.Message:
			channel := (*string)(unsafe.Pointer(&res.Channel))
			message := (*[]byte)(unsafe.Pointer(&res.Data))
			cb(*channel, *message)
		case redis.Subscription:
			fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
		case error:
			utils.Log.Error("断线 error handle...")
			time.Sleep(1 * time.Second)
			//c.sub(channel, cb)
			//continue
		}
	}
	//}()

	//fmt.Println("订阅————————————————————————————————end")
}

/*
*channelName 通道名称
sleep 每条推送间隔时间 秒
num 单次推送数量
*/
func (c Subscriber) push(channelName string, sleep int, num int) {
	defer func(c Subscriber, channelName string, sleep int, num int) {
		time.Sleep(1 * time.Second)
		c.push(channelName, sleep, num)
	}(c, channelName, sleep, num)
	conn, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialDatabase(0), redis.DialPassword("123456"))
	if err != nil {
		utils.Log.Info("pub subscribe dial failed.")
	}

	for i := 0; i < num; i++ {
		time.Sleep(time.Duration(sleep) * time.Second)
		msg := packetMessage()
		//msg := "str"
		rep, err := conn.Do("Publish", channelName, msg)
		if err != nil {
			fmt.Println("推送失败", err)
			fmt.Println("推送失败rep", rep)
			//panic(err)
		}

	}
}

var lastD []byte

// 订阅消息 格式封包
func packetMessage() *ProtoModel.MessagesPubProto {
	//var mes []ProtoModel.MessageProto
	//mes = append(mes, ProtoModel.MessageProto{Action: "XdfdsfdsdfdsX", Image: "1"})
	//mes = append(mes, ProtoModel.MessageProto{Action: "XXxxxxxxxxxxfeeXX", Image: "1"})
	//bug := service.Sub{Sid: "xxfd", Uid: "1084", Cid: idx}
	//bug.S.Lock()
	//bug := bytes.Buffer{}
	var mes2 ProtoModel.MessagesPubProto
	mes2.Did = strconv.FormatInt(idx, 10)
	mes2.Msg = append(mes2.Msg, &ProtoModel.MessageProto{Action: "XdfdsfdsdfdsX", Image: "1"})
	mes2.Msg = append(mes2.Msg, &ProtoModel.MessageProto{Action: "444444444444X", Image: "2"})
	//bug.WriteByte(1)
	//bug.WriteByte(2)

	//bug.Message(&mes2)
	//idx++
	return &mes2
}

var lastdata []byte

// 订阅消息 格式解包
func unpackMessage(byte2 []byte) (byte, byte, *ProtoModel.MessagesPubProto, error) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println(err) // 这里的err其实就是panic传入的内容，55
		}
	}()
	buf := bytes.NewBuffer(byte2)

	tp, _ := buf.ReadByte()
	dttype, _ := buf.ReadByte()

	dt := buf.Bytes()

	var data = ProtoModel.MessagesPubProto{}
	err := proto.Unmarshal(dt, &data)
	if err != nil {

		return 0, 0, nil, err
	}
	lastdata = dt
	return tp, dttype, &data, nil
}

var idx int64
var starttime int64
var num int64

func red(cmd byte, act byte, msg *ProtoModel.MessagesPubProto) {
	num++
	defer func() {
		//fmt.Println("unpackMessage defer")
		//time.Sleep(1 * time.Second)
		if err := recover(); err != nil {

			fmt.Println(err, msg.Did) // 这里的err其实就是panic传入的内容，55
		}
	}()
	if starttime == 0 {
		starttime = time.Now().Unix()
	}

	var id int64
	id, _ = strconv.ParseInt(msg.Did, 10, 64)

	//id := cid
	if num%10000 == 0 || id%10000 == 0 {
		fmt.Println("send-id", id, ";read-id:", num, ";time:", starttime, ";耗时:", time.Now().Unix()-starttime)
	}
}
func TestCallback1(chann string, msg []byte) {
	cmd, act, msgs, err := unpackMessage(msg)
	if err != nil {
		fmt.Println(cmd, act, msgs, err)
		return
	}
	red(cmd, act, msgs)
}

// 打包解包 调试
func subpub() {
	for {
		but := service.Sub{}
		byts := packetMessage()
		but.Message(byts)
		cmd, act, msgs, err := unpackMessage(but.Buffer.Bytes())
		if err != nil {
			fmt.Println(cmd, act, msgs, err)
			return
		}
		red(cmd, act, msgs)
		if idx%101000 == 0 {
			//fmt.Println(cmd, act, msgs, err)
			time.Sleep(100 * time.Second)
		}
		//time.Sleep(1 * time.Second)
	}
}
func sub() {
	defer func() {
		if err := recover(); err != nil {

			fmt.Println(err) // 这里的err其实就是panic传入的内容，55
			//time.Sleep(2 * time.Second)
			sub()
		}
	}()
	//connection.ConnectRedis()                 //初始化连接redis
	//defer connection.RedisPool.Client.Close() //退出前执行关闭
	(&connection2.PubRedis{}).Get().PubSubConn("chat", TestCallback1)

}

// redis 订阅消息批量推送测试
func pub() {
	defer func() {
		if err := recover(); err != nil {
			idx--
			fmt.Println(err) // 这里的err其实就是panic传入的内容，55
			//time.Sleep(2 * time.Second)
			pub()
		}
	}()
	for {
		idx++
		sub1 := service.Sub{Sid: "xxfd", Uid: "1084", Cid: idx}
		sub1.Type = ProtoModel.DataTypeProto_MESSAGE
		sub1.Act = ProtoModel.SendActProto_SENDALL
		sub1.Message(packetMessage())
		time.Sleep(5 * time.Second)
		//err := sub1.SendRedisPublish()
		sub1.SendRedisPublish()
		//用send 比do快5倍左右，实测到10秒发送92.5w，平均每秒9.2万,do方法推送10秒16w条左右。
		if idx%101000 == 0 {
			time.Sleep(100 * time.Second)
		}
	}

}

func main() {
	//加载配置
	config.Init()
	//全局变量
	global.WsInit()
	//connection.InIt() //初始化连接redis
	//connection.NewRedisClient()
	var start = false
	var starttime int64
	for start != true {
		if starttime != 0 && starttime < time.Now().Unix() {
			start = true
		}
		starttime = time.Now().Unix()
	}
	fmt.Println("starttime", starttime)
	//go sub()
	pub()

}

// 1 推送类型 byte 1 全推送|2 多uid推，3 单uid推 4 单uid下设备id推 5 多消息逐条推(根据每条接收人uid 遍历推)
// 2 消息类型 byte 1 推功能 2 推聊天
// 3 服务器id str
// 4 用户uid str
// 5 用户设备id int64
// 6 消息体

//单发   2服务器id  [消息模版]
